feat: Plumb Parquet virtual columns (row_number) through TableSchema and ParquetOpener#22026
feat: Plumb Parquet virtual columns (row_number) through TableSchema and ParquetOpener#22026mbutrovich wants to merge 17 commits into
Conversation
…and ParquetOpener, gated behind a tested-only extension-type allowlist, to unblock Comet's native-DataFusion support for Spark's _tmp_metadata_row_index.
|
My main concern is #22026 (comment). The various schemas in |
Thanks for the review @adriangb! Agreed it could make things more complicated, but if DataFusion is ever going to support these virtual columns it might be unavoidable. I think it's good to hash this stuff out in the smallest possible PR at the opener level. I'll push an update later today. |
|
Thanks again for the review @adriangb! Hopefully I addressed all of the feedback, but happy to keep chatting about it. Mixed virtual/file predicates with Confirmed the silent-drop bug with failing tests. Root cause: Arrow-rs can't accept virtual-column refs in a Fix: added Defense-in-depth in the opener for callers who bypass the optimizer (e.g. manual plan builders): Tests: Ordering doc on Struct field doc now spells out the Enum + Added |
|
I think this would then have a negative interaction with the goal of turning filter pushdown on by default. Maybe we'll always have to apply some filters as a |
Comet conservatively never removes Wouldn't this only prevent filter pushdown for filters that reference virtual columns? |
Yeah but it means we'll have to keep the split forever. Which might have been the case anyway and maybe a non issue. And that any filter that does reference virtual columns cannot be pushed down even if a part of it would benefit from doing so, e..g |
|
I plan to give this another review tomorrow. |
|
run benchmark tpch tpcds |
|
@mbutrovich from high level perspective how |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing virtual-columns-table-schema (bd513ec) to 2c7af17 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing virtual-columns-table-schema (bd513ec) to 2c7af17 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
Following up on this: I added File-vs-partition collisions in |
| /// Virtual columns are appended at the end of the table schema, after any | ||
| /// partition columns. | ||
| pub fn with_virtual_columns(mut self, virtual_columns: Vec<FieldRef>) -> Self { | ||
| debug_assert!( |
There was a problem hiding this comment.
Should we raise an error if found name collision? SchemaBuilder won't check this.
There was a problem hiding this comment.
Thanks for the question @niebayes: I addressed this in #22026 (comment). My argument is that we should handle this in #20135 like partition columns. The debug assertion merely expresses the contract in code without a runtime overhead for release builds.
|
I made hybrid(genAI + manual) initial review |
| partitioned_file: PartitionedFile, | ||
| ) -> Result<PreparedParquetOpen> { | ||
| validate_supported_virtual_columns(self.table_schema.virtual_columns())?; | ||
| if self.pushdown_filters |
There was a problem hiding this comment.
so if self.pushdown_filters is not set but predicate exists, we dont need validation? Not sure if this expected
|
|
Thanks @mbutrovich I'm planning to the second round today. @adriangb please ping if you feel there are any blockers anticipated or discussions needed |
| /// above the scan; this check is defense-in-depth for callers that build plans | ||
| /// manually and set `with_pushdown_filters(true)` alongside a predicate | ||
| /// referencing virtual columns. | ||
| fn validate_predicate_does_not_reference_virtual_columns( |
There was a problem hiding this comment.
Instead of erroring, could we reject just these filters during filter pushdown?
There was a problem hiding this comment.
Thanks @adriangb, want to make sure I understand the intent before I refactor, since the two consumer shapes we have land in different places:
- DataFusion front-end path: planner calls
try_pushdown_filters, which already reports virtual-col filters asPushedDown::Noand excludes them fromsource.predicate. TheFilterExecstays above the scan, so the opener never sees a virtual-col ref in the first place. - Direct-opener consumers (Comet, and anyone else bypassing the optimizer): they construct the predicate and set
with_pushdown_filters(true)themselves. Notry_pushdown_filterscall, no guarantee the predicate was split, noFilterExecabove unless they built one.
The opener-level check only matters for the second group. If we silently split and drop virtual-col conjuncts, those callers get wrong results with no signal; the current error tells them the contract and how to fix it.
Is your suggestion:
- (a) Drop the opener check entirely and rely on the
try_pushdown_filtersboundary, accepting that direct-opener callers are on their own? - (b) Keep the check but split the predicate in the opener so non-virtual conjuncts still participate in row-filtering / pruning, rather than erroring on the whole predicate?
- Something else?
Happy to go either way, just want to make sure we're not setting a footgun for the Comet shape.
There was a problem hiding this comment.
I see. I didn't realize Comet sets the filters directly without going through try_pushdown_filters. My suggestion (which may not be possible) would be that Comet and other similar consumers follow the same contract the rest of DataFusion uses and go through try_pushdown_filters. If needed we could extract some bit of the optimizer rule into public functions to avoid you having to re-implement the logic of placing a FilterExec above for rejected filters, etc, but I also think that should not be that complicated.
Does that make sense or are there reasons why that would not work?
There was a problem hiding this comment.
@AdamGS I know you mentioned wanting this feature, does this contract make sense for your use case? Basically, requiring try_pushdown_filters first? I'm afraid of not enforcing the contract in the opener for consumers who bypass try_pushdown_filters.
There was a problem hiding this comment.
I think the middle-ground I would like: we enforce the contract in a debug assertion in the opener, so it's not on the critical path of release builds, but developers who are building on top of DataFusion (particularly in dev builds or CI builds with debug assertions enabled) will get an error. Not having that feels like too much of a footgun to me, since we don't have a good way of enforcing the contract that try_pushdown_filters was called first. What do you think @adriangb?
There was a problem hiding this comment.
I am testing using try_pushdown_filters instead of with_predicate in Comet:
There was a problem hiding this comment.
That sounds like a good plan to me. If we can have try_pushdown_filters handle the rejection gracefully and the opener itself error that sounds like an ideal solution.
There was a problem hiding this comment.
Cool, I'll move forward with that refactor. Thanks for talking through it with me!
There was a problem hiding this comment.
Hopefully should be good for one more look @adriangb.
…to virtual-columns-table-schema
adriangb
left a comment
There was a problem hiding this comment.
I think this looks good. @mbutrovich do you intend to get this into 54? I think there's something to be said for waiting until 54 goes out at this point so we can do the rest of the work wiring up so we can derisk the design as a whole.
I'd like it in 54 if we think the API at this layer is stable, but I see your argument that if the API needs a tweak when we go to hook everything up that we hit API stability challenges. I am okay to defer, but also was not planning to do the work to hook it up to the front-end any time soon, so it becomes an indefinite merge/maybe not completely wired in 55 either. |
|
Gotcha. If you're okay deferring until 54 (which should just be a week or two) I think that'd make me feel more comfortable taking the risk. We don't have feature freezes officially but I think it's a good general approach to take. I asked in #20135 (comment) if anyone can drive the rest of this but I'd say once 54 is out we can merge this regardless. Thanks for working on this it's been quite the effort! |
|
No worries. This isn't urgently needed in Comet, it's just on the list of Spark gaps we want to close. Thanks for your help thus far! |
Which issue does this PR close?
_tmp_metadata_row_index).Rationale for this change
arrow-rs 57.1.0+ supports Parquet virtual columns (
row_number,row_group_index) viaArrowReaderOptions::with_virtual_columns, and DataFusion pins a new-enough arrow-rs for the API to be available. DataFusion does not yet plumb the option throughParquetOpener, so consumers (notably Comet) cannot project Spark's_tmp_metadata_row_indexthrough the native_datafusion scan path.This PR adds the minimal opener-boundary plumbing so
TableSchemacan carry virtual columns and the Parquet reader produces them. UX / SQL-layer surface for virtual columns stays deferred to the epic in #20135 — this follows the same framing alamb blessed for #20071 (theinput_file_name()UDF).What changes are included in this PR?
TableSchema::with_virtual_columns(...)builder +virtual_columns()getter. Layout:[file, partition, virtual]. Composable withwith_table_partition_colsin either order.TableSchema::schema_without_virtual_columns()— file + partition schema used by pushdown-planning paths that can't evaluate virtual-col refs.ParquetOpenerforwards the fields toArrowReaderOptions::with_virtual_columns; augments the schemas passed to the expr-adapter / simplifier with virtual fields so virtual-col refs identity-rewrite; strips them from the projection fed toProjectionMask::roots(which only understands file columns) and appends them tostream_schemasoreassign_expr_columnsresolves them by name.ParquetVirtualColumnenum withTryFrom<&FieldRef>(indatasource-parquet::virtual_column) gates which arrow-rs virtual extension types are accepted. Currently onlyRowNumber; adding a variant (e.g.RowGroupIndex) is a compile-time obligation. Replaces the earlier runtime string-allowlist so the contract lives in the type system.ParquetSource::try_pushdown_filtersclassifies filters against the file+partition schema (not the full table schema) so predicates referencing virtual columns are reported asPushedDown::Noand theFilterExecstays above the scan — arrow-rs'sRowFilteraddresses parquet leaves only and can't evaluate virtual-column refs, so silently pushing them would produce wrong results.build_virtual_columns_state(run once per scan partition at morselizer-build time) errors whenpushdown_filters=trueand the predicate references a virtual column, with a clear remediation message pointing attry_pushdown_filters. This catches callers that bypass the optimizer and set the predicate onParquetSourcedirectly.arrow-schemaadded as a direct dep (previously transitive viaarrow) so the enum referencesRowNumber::NAMEfrom arrow-rs instead of hardcoding the string.ListingTable/ SQL-layer surface, a three-arg constructor onTableSchema,ParquetSource::with_virtual_columns, andRowGroupIndexsupport.Are these changes tested?
Yes. New unit tests in
opener.rs:test_row_index_basic— single row group, select data + row_number.test_row_index_projection_only— select only row_number.test_row_index_multi_row_group— 3 × 100 rows, verify absolute 0..300 across boundaries.test_row_index_with_row_group_skip— predicate stats-prunes the middle row group; verify row numbers stay absolute (0..100 ++ 200..300). Critical correctness gate for Spark (and for FixRowNumberReaderwhen not all row groups are selected arrow-rs#8863).test_row_index_with_partition_cols— partition + virtual + data columns compose correctly.test_row_index_nullable_int64— nullability flag flows through unchanged (matches Spark's_tmp_metadata_row_indexdeclaration).test_unsupported_virtual_extension_type_rejected— usingRowGroupIndex(a real arrow-rs type deliberately not in the enum yet) errors withNotImplementedinstead of silently forwarding.test_row_index_predicate_pushdown_mixed_or_errors/_virtual_only_errors/_allowed_when_pushdown_disabled— exercise the opener's defensive check for virtual-col predicate refs withpushdown_filters=true, and confirm thepushdown_filters=falsepath is unaffected.In
source.rs:test_try_pushdown_filters_rejects_virtual_column_refspins the planner-boundary contract — file-col filters arePushedDown::Yes, virtual-only and mixed filters arePushedDown::No.In
virtual_column.rs: unit tests coveringTryFrom<&FieldRef>for valid, missing-extension-type, and unsupported-extension-type inputs.Plus a
TableSchemaunit test verifying the[file, partition, virtual]layout is stable regardless of builder-call order.Are there any user-facing changes?
Public API additions:
TableSchema::with_virtual_columns(...),TableSchema::virtual_columns(),TableSchema::schema_without_virtual_columns(), andParquetVirtualColumn(re-exported fromdatafusion-datasource-parquet). No existing API changed; no breaking changes.